package com.dahuan.state;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class State_Checkpoint {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //1.检查点配置
        //状态检查点之间的时间间隔（以毫秒为单位）
        env.enableCheckpointing( 300L );

        //TODO 高级选项
        //检查点模式定义了在出现故障时系统可以保证的一致性。
        env.getCheckpointConfig().setCheckpointingMode( CheckpointingMode.EXACTLY_ONCE );

        //设置检查点在被丢弃之前可能花费的最长时间。
        env.getCheckpointConfig().setCheckpointTimeout( 60000L );

        //并发检查点尝试的最大次数。
        env.getCheckpointConfig().setMaxConcurrentCheckpoints( 2 );

        //触发下一个检查点之前的最小暂停。
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints( 100L );

        //设置是否有较新的保存点时作业恢复是否应回退到检查点。
        env.getCheckpointConfig().setPreferCheckpointForRecovery( true );

        //设置可容忍的检查点失败数，默认值为0，表示我们不容忍任何检查点失败。
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber( 0 );

        //2.重启策略
        //重新启动要配置的策略配置
        //固定延迟重启
        /**
         * 生成FixedDelayRestartStrategyConfiguration。
         * @param restartAttempts FixedDelayRestartStrategy的重启尝试次数
         * @param delayBetweenAttempts FixedDelayRestartStrategy的重启尝试之间的延迟
         * @return FixedDelayRestartStrategy
         */
        //TODO 每隔10秒中重启尝试3次
        env.setRestartStrategy( RestartStrategies.fixedDelayRestart( 3, 10000L ) );

        //失败率重启
        /**
         * 生成FailureRateRestartStrategyConfiguration。
         * @param failureRate作业失败之前，给定间隔{@code failureInterval}中的最大重新启动次数
         * @param failureInterval失败的时间间隔
         * @param delayInterval重新启动尝试之间的延迟
         */
        //TODO 10分钟之内尝试三次，每次时间间隔为1分钟
        env.setRestartStrategy( RestartStrategies.failureRateRestart( 3, Time.minutes( 10 ), Time.minutes( 1 ) ) );


        env.execute( "State_Checkpoint" );


    }
}
